// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.alter;

import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MysqlCompatibleDatabase;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.AlterCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterSystemCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelDecommissionBackendCommand;
import org.apache.doris.nereids.trees.plans.commands.info.AddBackendOp;
import org.apache.doris.nereids.trees.plans.commands.info.AddBrokerOp;
import org.apache.doris.nereids.trees.plans.commands.info.AddFollowerOp;
import org.apache.doris.nereids.trees.plans.commands.info.AddObserverOp;
import org.apache.doris.nereids.trees.plans.commands.info.AlterOp;
import org.apache.doris.nereids.trees.plans.commands.info.DecommissionBackendOp;
import org.apache.doris.nereids.trees.plans.commands.info.DropAllBrokerOp;
import org.apache.doris.nereids.trees.plans.commands.info.DropBackendOp;
import org.apache.doris.nereids.trees.plans.commands.info.DropBrokerOp;
import org.apache.doris.nereids.trees.plans.commands.info.DropFollowerOp;
import org.apache.doris.nereids.trees.plans.commands.info.DropObserverOp;
import org.apache.doris.nereids.trees.plans.commands.info.ModifyBackendHostNameOp;
import org.apache.doris.nereids.trees.plans.commands.info.ModifyBackendOp;
import org.apache.doris.nereids.trees.plans.commands.info.ModifyBrokerOp;
import org.apache.doris.nereids.trees.plans.commands.info.ModifyFrontendHostNameOp;
import org.apache.doris.nereids.trees.plans.commands.info.ModifyNodeHostNameOp;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/*
 * SystemHandler is for
 * 1. add/drop/decommission backends
 * 2. add/drop frontends
 * 3. add/drop/modify brokers
 */
public class SystemHandler extends AlterHandler {
    private static final Logger LOG = LogManager.getLogger(SystemHandler.class);

    // backendId -> tabletId -> checkTime
    private Map<Long, Map<Long, Long>> backendLeakyTablets = Maps.newHashMap();

    public SystemHandler() {
        super("system");
    }

    @Override
    protected void runAfterCatalogReady() {
        super.runAfterCatalogReady();
        if (Config.isNotCloudMode()) {
            runAlterJobV2();
        }
    }

    // check all decommissioned backends, if there is no available tablet on that backend, drop it.
    private void runAlterJobV2() {
        SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
        backendLeakyTablets.entrySet().removeIf(entry -> {
            long beId = entry.getKey();
            Backend backend = systemInfoService.getBackend(beId);
            return backend == null || !backend.isDecommissioned();
        });
        // check if decommission is finished
        for (Long beId : systemInfoService.getAllBackendIds(false)) {
            Backend backend = systemInfoService.getBackend(beId);
            if (backend == null || !backend.isDecommissioned()) {
                continue;
            }

            AtomicInteger totalTabletNum = new AtomicInteger(0);
            List<Long> sampleTablets = Lists.newArrayList();
            List<Long> sampleLeakyTablets = Lists.newArrayList();
            // check backend had migrated all its tablets, otherwise sample some tablets for log
            boolean migratedTablets = checkMigrateTablets(beId, 10, sampleTablets, sampleLeakyTablets, totalTabletNum);
            long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
            if (Config.drop_backend_after_decommission && migratedTablets && walNum == 0) {
                try {
                    systemInfoService.dropBackend(beId);
                    LOG.info("no available tablet on decommission backend {}, drop it", beId);
                } catch (DdlException e) {
                    // does not matter, may be backend not exist
                    LOG.info("backend {} is dropped failed after decommission {}", beId, e.getMessage());
                }
                continue;
            }

            LOG.info("backend {} lefts {} replicas to decommission: normal tablets {}{}{}",
                    beId, totalTabletNum.get(), sampleTablets,
                    sampleLeakyTablets.isEmpty() ? "" : "; maybe leaky tablets " + sampleLeakyTablets,
                    walNum > 0 ? "; and has " + walNum + " unfinished WALs" : "");
        }
    }

    @Override
    public List<List<Comparable>> getAlterJobInfosByDb(Database db) {
        throw new NotImplementedException("getAlterJobInfosByDb is not supported in SystemHandler");
    }

    @Override
    // add synchronized to avoid process 2 or more stmts at same time
    public synchronized void process(String rawSql, List<AlterOp> alterOps, Database dummyDb,
                                     OlapTable dummyTbl) throws UserException {
        Preconditions.checkArgument(alterOps.size() == 1);
        AlterOp alterOp = alterOps.get(0);

        if (alterOp instanceof AddBackendOp) {
            // add backend
            AddBackendOp addBackendOp = (AddBackendOp) alterOp;
            Env.getCurrentSystemInfo().addBackends(addBackendOp.getHostInfos(), addBackendOp.getTagMap());
        } else if (alterOp instanceof DropBackendOp) {
            // drop backend
            DropBackendOp dropBackendOp = (DropBackendOp) alterOp;
            if (!dropBackendOp.isForce()) {
                throw new DdlException("It is highly NOT RECOMMENDED to use DROP BACKEND stmt."
                        + "It is not safe to directly drop a backend. "
                        + "All data on this backend will be discarded permanently. "
                        + "If you insist, use DROPP instead of DROP");
            }
            if (dropBackendOp.getHostInfos().isEmpty()) {
                // drop by id
                Env.getCurrentSystemInfo().dropBackendsByIds(dropBackendOp.getIds());
            } else {
                // drop by host
                Env.getCurrentSystemInfo().dropBackends(dropBackendOp.getHostInfos());
            }
        } else if (alterOp instanceof DecommissionBackendOp) {
            // decommission
            DecommissionBackendOp decommissionBackendOp = (DecommissionBackendOp) alterOp;
            // check request
            List<Backend> decommissionBackends = checkDecommission(decommissionBackendOp);

            // set backend's state as 'decommissioned'
            // for decommission operation, here is no decommission job. the system handler will check
            // all backend in decommission state
            for (Backend backend : decommissionBackends) {
                Env.getCurrentSystemInfo().decommissionBackend(backend);
            }

        } else if (alterOp instanceof AddObserverOp) {
            AddObserverOp op = (AddObserverOp) alterOp;
            Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, op.getHost(),
                    op.getPort());
        } else if (alterOp instanceof DropObserverOp) {
            DropObserverOp op = (DropObserverOp) alterOp;
            Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, op.getHost(),
                    op.getPort());
        } else if (alterOp instanceof AddFollowerOp) {
            AddFollowerOp op = (AddFollowerOp) alterOp;
            Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, op.getHost(),
                    op.getPort());
        } else if (alterOp instanceof DropFollowerOp) {
            DropFollowerOp op = (DropFollowerOp) alterOp;
            Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, op.getHost(),
                    op.getPort());
        } else if (alterOp instanceof ModifyBrokerOp) {
            ModifyBrokerOp modifyBrokerOp = (ModifyBrokerOp) alterOp;
            Env.getCurrentEnv().getBrokerMgr().execute(modifyBrokerOp);
        } else if (alterOp instanceof ModifyBackendOp) {
            Env.getCurrentSystemInfo().modifyBackends(((ModifyBackendOp) alterOp));
        } else if (alterOp instanceof ModifyFrontendHostNameOp) {
            ModifyFrontendHostNameOp op = (ModifyFrontendHostNameOp) alterOp;
            Env.getCurrentEnv().modifyFrontendHostName(op.getHost(), op.getPort(), op.getNewHost());
        } else if (alterOp instanceof ModifyBackendHostNameOp) {
            Env.getCurrentSystemInfo().modifyBackendHost((ModifyBackendHostNameOp) alterOp);
        } else {
            Preconditions.checkState(false, alterOp.getClass());
        }
    }

    @Override
    // add synchronized to avoid process 2 or more stmts at same time
    public synchronized void processForNereids(String rawSql, List<AlterCommand> alterCommands,
                                     Database dummyDb,
                                     OlapTable dummyTbl) throws UserException {
        Preconditions.checkArgument(alterCommands.size() == 1);
        AlterCommand alterCommand = alterCommands.get(0);
        if (alterCommand instanceof AlterSystemCommand) {
            AlterSystemCommand alterSystemCommand = (AlterSystemCommand) alterCommand;
            if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_ADD_BACKEND)) {
                // add backend
                AddBackendOp op = (AddBackendOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentSystemInfo().addBackends(op.getHostInfos(), op.getTagMap());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_DROP_BACKEND)) {
                // drop backend
                DropBackendOp op = (DropBackendOp) alterSystemCommand.getAlterSystemOp();
                if (!op.isForce()) {
                    throw new DdlException("It is highly NOT RECOMMENDED to use DROP BACKEND stmt."
                            + "It is not safe to directly drop a backend. "
                            + "All data on this backend will be discarded permanently. "
                            + "If you insist, use DROPP instead of DROP");
                }
                if (op.getHostInfos().isEmpty()) {
                    // drop by id
                    Env.getCurrentSystemInfo().dropBackendsByIds(op.getIds());
                } else {
                    // drop by host
                    Env.getCurrentSystemInfo().dropBackends(op.getHostInfos());
                }
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_DECOMMISSION_BACKEND)) {
                // decommission
                DecommissionBackendOp op = (DecommissionBackendOp) alterSystemCommand.getAlterSystemOp();
                // check request
                List<Backend> decommissionBackends = checkDecommissionForNereids(op);

                // set backend's state as 'decommissioned'
                // for decommission operation, here is no decommission job. the system handler will check
                // all backend in decommission state
                for (Backend backend : decommissionBackends) {
                    Env.getCurrentSystemInfo().decommissionBackend(backend);
                }
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_ADD_OBSERVER)) {
                AddObserverOp op = (AddObserverOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, op.getHost(), op.getPort());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_DROP_OBSERVER)) {
                DropObserverOp op = (DropObserverOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, op.getHost(), op.getPort());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_ADD_FOLLOWER)) {
                AddFollowerOp op = (AddFollowerOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, op.getHost(), op.getPort());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_DROP_FOLLOWER)) {
                DropFollowerOp op = (DropFollowerOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, op.getHost(), op.getPort());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_ADD_BROKER)) {
                AddBrokerOp op = (AddBrokerOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentEnv().getBrokerMgr().addBrokers(op.getBrokerName(), op.getHostPortPairs());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_DROP_BROKER)) {
                DropBrokerOp op = (DropBrokerOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentEnv().getBrokerMgr().dropBrokers(op.getBrokerName(), op.getHostPortPairs());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_DROP_ALL_BROKER)) {
                DropAllBrokerOp op = (DropAllBrokerOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentEnv().getBrokerMgr().dropAllBroker(op.getBrokerName());
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_MODIFY_BACKEND)) {
                ModifyBackendOp op = (ModifyBackendOp) alterSystemCommand.getAlterSystemOp();
                Env.getCurrentSystemInfo().modifyBackends(op);
            } else if (alterSystemCommand.getType().equals(PlanType.ALTER_SYSTEM_MODIFY_FRONTEND_OR_BACKEND_HOSTNAME)) {
                ModifyNodeHostNameOp op =
                        (ModifyNodeHostNameOp) alterSystemCommand.getAlterSystemOp();
                if (op.getModifyOpType().equals(ModifyNodeHostNameOp.ModifyOpType.Frontend)) {
                    Env.getCurrentEnv().modifyFrontendHostName(op.getHost(), op.getPort(), op.getNewHost());
                } else {
                    Env.getCurrentSystemInfo().modifyBackendHostName(op.getHost(), op.getPort(), op.getNewHost());
                }
            } else {
                Preconditions.checkState(false, alterCommand.getClass());
            }
        } else {
            throw new UserException("Not supported alter command type " + alterCommand.getType());
        }

    }

    /*
     * check if the specified backends can be dropped
     * 1. backend does not have any tablet.
     * 2. or all tablets in backend have been recycled or been leaky for a long time.
     *
     * and return some sample tablets for log.
     *
     * sampleLimit: the max sample tablet num
     * sampleTablets: sample normal tablets
     * sampleLeakyTablets: sample leaky tablets
     *
     */
    private boolean checkMigrateTablets(long beId, int sampleLimit, List<Long> sampleTablets,
            List<Long> sampleLeakyTablets, AtomicInteger totalTabletNum) {
        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
        List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
        totalTabletNum.set(backendTabletIds.size());
        if (backendTabletIds.isEmpty()) {
            return true;
        }
        // if too many tablets, no check for efficiency
        if (backendTabletIds.size() > Config.decommission_tablet_check_threshold) {
            backendTabletIds.stream().limit(sampleLimit).forEach(sampleTablets::add);
            return false;
        }
        // dbId -> tableId -> partitionId -> tablet list
        Map<Long, Map<Long, Map<Long, List<Long>>>> tabletsMap = Maps.newHashMap();
        List<TabletMeta> tabletMetaList = invertedIndex.getTabletMetaList(backendTabletIds);
        for (int i = 0; i < backendTabletIds.size(); i++) {
            long tabletId = backendTabletIds.get(i);
            TabletMeta tabletMeta = tabletMetaList.get(i);
            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
                continue;
            }
            tabletsMap.computeIfAbsent(tabletMeta.getDbId(), k -> Maps.newHashMap())
                    .computeIfAbsent(tabletMeta.getTableId(), k -> Maps.newHashMap())
                    .computeIfAbsent(tabletMeta.getPartitionId(), k -> Lists.newArrayList())
                    .add(tabletId);
        }
        InternalCatalog catalog = Env.getCurrentInternalCatalog();
        CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
        long now = System.currentTimeMillis();
        Map<Long, Long> leakyTablets = Maps.newHashMap();
        boolean searchedFirstTime = !backendLeakyTablets.containsKey(beId);
        Map<Long, Long> lastLeakyTablets = backendLeakyTablets.computeIfAbsent(beId, k -> Maps.newHashMap());
        backendLeakyTablets.put(beId, leakyTablets);
        Consumer<List<Long>> addPartitionLeakyTablets = tabletsOfPartition -> {
            tabletsOfPartition.forEach(tabletId -> {
                leakyTablets.put(tabletId, lastLeakyTablets.getOrDefault(tabletId, now));
            });
        };
        Consumer<Map<Long, List<Long>>> addTableLeakyTablets = tabletsOfTable -> {
            tabletsOfTable.values().forEach(addPartitionLeakyTablets);
        };
        Consumer<Map<Long, Map<Long, List<Long>>>> addDbLeakyTablets = tabletsOfDb -> {
            tabletsOfDb.values().forEach(addTableLeakyTablets);
        };

        // Search backend's tablets, put 10 normal tablets into sampleTablets, put leaky tablets into leakyTablets.
        // For the first time search, it will search all this backend's tablets.
        // For later search, it only search at most 10 normal tablets, in order to reduce lock table.
        boolean searchedAllTablets = true;
        OUTER:
        for (Map.Entry<Long, Map<Long, Map<Long, List<Long>>>> dbEntry : tabletsMap.entrySet()) {
            long dbId = dbEntry.getKey();
            Database db = catalog.getDbNullable(dbId);
            if (db == null) {
                // not found db, and it's not in recyle bin, then it should be leaky.
                if (!recycleBin.isRecycleDatabase(dbId)) {
                    addDbLeakyTablets.accept(dbEntry.getValue());
                }
                continue;
            }

            for (Map.Entry<Long, Map<Long, List<Long>>> tableEntry : dbEntry.getValue().entrySet()) {
                long tableId = tableEntry.getKey();
                Table tbl = db.getTableNullable(tableId);
                if (tbl == null || !tbl.isManagedTable()) {
                    if (!recycleBin.isRecycleTable(dbId, tableId)) {
                        addTableLeakyTablets.accept(tableEntry.getValue());
                    }
                    continue;
                }

                OlapTable olapTable = (OlapTable) tbl;
                olapTable.readLock();
                try {
                    for (Map.Entry<Long, List<Long>> partitionEntry : tableEntry.getValue().entrySet()) {
                        long partitionId = partitionEntry.getKey();
                        Partition partition = olapTable.getPartition(partitionId);
                        if (partition == null) {
                            if (!recycleBin.isRecyclePartition(dbId, tableId, partitionId)) {
                                addPartitionLeakyTablets.accept(partitionEntry.getValue());
                            }
                            continue;
                        }
                        // at present, the leaky tablets are belong to a not-found partition.
                        // so if a partition is in a table, no more check this partition really contains this tablet,
                        // just treat this tablet as no leaky.
                        for (long tabletId : partitionEntry.getValue()) {
                            if (sampleTablets.size() < sampleLimit) {
                                sampleTablets.add(tabletId);
                            } else if (!searchedFirstTime) {
                                // First time will search all tablets,
                                // The later search will stop searching after found 10 normal tablets
                                // in order to reduce table lock.
                                searchedAllTablets = false;
                                break OUTER;
                            }
                        }
                    }
                } finally {
                    olapTable.readUnlock();
                }
            }
        }

        if (!searchedAllTablets) {
            // due to not search all tablets, it may miss some leaky tablets.
            // so we add the leaky tablets of the last time search.
            // it can infer that leakyTablets will contains all leaky tablets of the first time search.
            // And we know that the first time it searched all tablets.
            leakyTablets.putAll(lastLeakyTablets);
        }
        leakyTablets.keySet().stream().limit(sampleLimit).forEach(sampleLeakyTablets::add);

        // If a tablet can't be found in path 'db -> table -> partition', and it's not in recyle bin,
        // we treat this tablet as leaky, but it maybe not real leaky.
        // The onflight creating new partiton/table may let its tablets seem like leaky temporarily.
        // For example, when creatting a new partition, firstly its tablets will add to TabletInvertedIndex.
        // But at this moment, the partition hadn't add to table, so search the tablet with path
        // 'db -> table -> partition' will failed. Only after finish creating, the partition will add to the table.
        //
        // So the onflight new tablet maynot be real leaky. Need to wait for a time to confirm they are real leaky.
        long skipLeakyTs = now - Config.decommission_skip_leaky_tablet_second * 1000L;

        // if a backend no normal tablets (sampleTablets size = 0), and leaky tablets had been leaky for a long time,
        // then can drop it now.
        return sampleTablets.isEmpty() && leakyTablets.values().stream().allMatch(ts -> ts < skipLeakyTs);
    }

    private List<Backend> checkDecommission(DecommissionBackendOp decommissionBackendOp)
            throws DdlException {
        if (decommissionBackendOp.getHostInfos().isEmpty()) {
            return checkDecommissionByIds(decommissionBackendOp.getIds());
        }
        return checkDecommission(decommissionBackendOp.getHostInfos());
    }

    private List<Backend> checkDecommissionForNereids(DecommissionBackendOp decommissionBackendOp)
            throws DdlException {
        if (decommissionBackendOp.getHostInfos().isEmpty()) {
            return checkDecommissionByIds(decommissionBackendOp.getIds());
        }
        return checkDecommission(decommissionBackendOp.getHostInfos());
    }

    /*
     * check if the specified backends can be decommissioned
     * 1. backend should exist.
     * 2. after decommission, the remaining backend num should meet the replication num.
     * 3. after decommission, The remaining space capacity can store data on decommissioned backends.
     */
    public static List<Backend> checkDecommission(List<HostInfo> hostInfos)
            throws DdlException {
        SystemInfoService infoService = Env.getCurrentSystemInfo();
        List<Backend> decommissionBackends = Lists.newArrayList();
        // check if exist
        for (HostInfo hostInfo : hostInfos) {
            Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getHost(),
                    hostInfo.getPort());
            if (backend == null) {
                throw new DdlException("Backend does not exist["
                        + NetUtils.getHostPortInAccessibleFormat(hostInfo.getHost(), hostInfo.getPort()) + "]");
            }
            if (backend.isDecommissioned()) {
                // already under decommission, ignore it
                continue;
            }
            decommissionBackends.add(backend);
        }

        checkDecommissionWithReplicaAllocation(decommissionBackends);

        // TODO(cmy): check remaining space

        return decommissionBackends;
    }

    public static List<Backend> checkDecommissionByIds(List<String> ids)
            throws DdlException {
        SystemInfoService infoService = Env.getCurrentSystemInfo();
        List<Backend> decommissionBackends = Lists.newArrayList();
        // check if exist
        for (String id : ids) {
            Backend backend = infoService.getBackend(Long.parseLong(id));
            if (backend == null) {
                throw new DdlException("Backend does not exist, backend id is " + id);
            }
            if (backend.isDecommissioned()) {
                // already under decommission, ignore it
                continue;
            }
            decommissionBackends.add(backend);
        }

        checkDecommissionWithReplicaAllocation(decommissionBackends);

        // TODO(cmy): check remaining space

        return decommissionBackends;
    }

    private static void checkDecommissionWithReplicaAllocation(List<Backend> decommissionBackends)
            throws DdlException {
        if (Config.isCloudMode() || decommissionBackends.isEmpty()
                || DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) {
            return;
        }

        Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> be.getLocationTag())
                .collect(Collectors.toSet());
        Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
        List<Backend> bes;
        try {
            bes = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().asList();
        } catch (UserException e) {
            LOG.warn("Failed to get current cluster backend by current cluster.", e);
            return;
        }

        for (Backend backend : bes) {
            long beId = backend.getId();
            if (!backend.isScheduleAvailable()
                    || decommissionBackends.stream().anyMatch(be -> be.getId() == beId)) {
                continue;
            }

            Tag tag = backend.getLocationTag();
            if (tag != null) {
                tagAvailBackendNums.put(tag, tagAvailBackendNums.getOrDefault(tag, 0) + 1);
            }
        }

        Env env = Env.getCurrentEnv();
        List<Long> dbIds = env.getInternalCatalog().getDbIds();
        for (Long dbId : dbIds) {
            Database db = env.getInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }

            if (db instanceof MysqlCompatibleDatabase) {
                continue;
            }

            for (Table table : db.getTables()) {
                table.readLock();
                try {
                    if (!table.isManagedTable()) {
                        continue;
                    }

                    OlapTable tbl = (OlapTable) table;
                    for (Partition partition : tbl.getAllPartitions()) {
                        ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
                        for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
                            Tag tag = entry.getKey();
                            if (!decommissionTags.contains(tag)) {
                                continue;
                            }
                            int replicaNum = (int) entry.getValue();
                            int backendNum = tagAvailBackendNums.getOrDefault(tag, 0);
                            if (replicaNum > backendNum) {
                                throw new DdlException("After decommission, partition " + partition.getName()
                                        + " of table " + db.getName() + "." + tbl.getName()
                                        + " 's replication allocation { " + replicaAlloc
                                        + " } > available backend num " + backendNum + " on tag " + tag
                                        + ", otherwise need to decrease the partition's replication num.");
                            }
                        }
                    }
                } finally {
                    table.readUnlock();
                }
            }
        }
    }

    public void cancel(CancelDecommissionBackendCommand command) throws DdlException {
        SystemInfoService infoService = Env.getCurrentSystemInfo();
        // check if backends is under decommission
        List<HostInfo> hostInfos = command.getHostInfos();
        if (hostInfos.isEmpty()) {
            List<String> ids = command.getIds();
            for (String id : ids) {
                Backend backend = infoService.getBackend(Long.parseLong(id));
                if (backend == null) {
                    throw new DdlException("Backend does not exist["
                        + id + "]");
                }
                if (!backend.isDecommissioned()) {
                    // it's ok. just log
                    LOG.info("backend is not decommissioned[{}]", backend.getId());
                    continue;
                }
                if (backend.setDecommissioned(false)) {
                    Env.getCurrentEnv().getEditLog().logBackendStateChange(backend);
                } else {
                    LOG.info("backend is not decommissioned[{}]", backend.getHost());
                }
            }

        } else {
            for (HostInfo hostInfo : hostInfos) {
                // check if exist
                Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getHost(),
                        hostInfo.getPort());
                if (backend == null) {
                    throw new DdlException("Backend does not exist["
                        + NetUtils.getHostPortInAccessibleFormat(hostInfo.getHost(), hostInfo.getPort()) + "]");
                }

                if (!backend.isDecommissioned()) {
                    // it's ok. just log
                    LOG.info("backend is not decommissioned[{}]", backend.getId());
                    continue;
                }

                if (backend.setDecommissioned(false)) {
                    Env.getCurrentEnv().getEditLog().logBackendStateChange(backend);
                } else {
                    LOG.info("backend is not decommissioned[{}]", backend.getHost());
                }
            }
        }
    }
}
